package com.vaye.im.listener;

import com.alibaba.fastjson.JSON;
import com.vaye.common.constant.CacheConstant;
import com.vaye.common.enums.TopicTypeEnums;
import com.vaye.common.utils.IPUtils;
import com.vaye.im.dto.TopicMsg;
import com.vaye.im.websocket.ProductWebSocket2;
import com.vaye.im.websocket.ProductWebSocket21;
import com.vaye.im.websocket.ProductWebSocket31;
import com.vaye.im.websocket.PushWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.util.ObjectUtils;

import java.util.List;

/**
 * @author wangzhiyong
 * @date 2022年07月11日 下午2:55
 */
@Slf4j
public class RedisTopicListener implements MessageListener{

    private StringRedisSerializer stringRedisSerializer;

    private StringRedisTemplate stringRedisTemplate;

    private ProductWebSocket21 productWebSocket21;

    private ProductWebSocket31 productWebSocket31;

    @Autowired
    private PushWebSocket pushWebSocket;

    public RedisTopicListener(RedisMessageListenerContainer listenerContainer, List< ? extends Topic> topics) {
        listenerContainer.addMessageListener(this,topics);
    }

    public void setStringRedisSerializer(StringRedisSerializer stringRedisSerializer) {
        this.stringRedisSerializer = stringRedisSerializer;
    }

    public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Autowired
    public void setProductWebSocket21(ProductWebSocket21 productWebSocket21) {
        this.productWebSocket21 = productWebSocket21;
    }

    @Autowired
    public void setProductWebSocket31(ProductWebSocket31 productWebSocket31){
        this.productWebSocket31 = productWebSocket31;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String patternStr = stringRedisSerializer.deserialize(pattern);
        String channel = stringRedisSerializer.deserialize(message.getChannel());
        String body = stringRedisSerializer.deserialize(message.getBody());
        log.info("event = {}, message.channel = {},  message.body = {}", patternStr, channel, body);
        if(CacheConstant.WS_CHAT_TOPIC.equals(channel)) {
            TopicMsg topicMsg = JSON.parseObject(body, TopicMsg.class);
            if (ObjectUtils.isEmpty(topicMsg)) {
                log.error("topicMsg is empty");
                return;
            }
            String fromUserId = topicMsg.getFromUserId();
            String targetUserId = topicMsg.getTargetUserId();
            String roomId = topicMsg.getRoomId();
            String msg = topicMsg.getMsg();
            Integer type = topicMsg.getType();
            log.info("fromUserId={},targetUserId={},msg={},type={}",fromUserId,targetUserId,msg,type);
            if (CacheConstant.WS_CHAT_TOPIC.equals(channel)) {
                if (TopicTypeEnums.PEER_TO_PEER.getCode().equals(type)) {   //点对点聊天
                    productWebSocket21.send(Long.valueOf(fromUserId), Long.valueOf(targetUserId), msg);
                } else {
                    if (IPUtils.getLocalIP().equals(topicMsg.getIp())) {
                        log.info("此消息已经处理,无需重复处理ip={},localIp={}",topicMsg.getIp(),IPUtils.getLocalIP());
                        return;
                    }
                    productWebSocket31.multicastMessage(roomId,Long.valueOf(fromUserId),msg);
                }
            }
        } else if (CacheConstant.WS_PUSH_TOPIC.equals(channel)) {   //消息推送
            TopicMsg topicMsg = JSON.parseObject(body, TopicMsg.class);
            String targetUserId = topicMsg.getTargetUserId();
            String msg = topicMsg.getMsg();
            log.info("消息推送,targetUserId={},msg={}",targetUserId,msg);
            pushWebSocket.push(StringUtils.isEmpty(targetUserId)?null:Long.valueOf(targetUserId),String.format("%s:%s","系统消息",msg));
        }
    }
}
